""" Utilities for data construction.

"""
import numpy as np
import pandas as pd
from geopy.distance import geodesic


#%% MECHANICAL RISK INDEX COMPUTATION FUNCTIONS -----------------------------------------
def geodesic_by_row(y):
    if (
        np.isnan(y.surf_latitude)
        | np.isnan(y.surf_longitude)
        | np.isnan(y.botm_latitude)
        | np.isnan(y.botm_longitude)
    ):
        return np.nan

    output = geodesic(
        (y['surf_latitude'], y['surf_longitude']),
        (y['botm_latitude'], y['botm_longitude']),
    ).feet

    return output


def phi_1(x):
    output = ((x['total_depth'] + x['water_depth']) / 1000) ** 2
    return output


def phi_2(x):
    output = (
            (x['vertical_depth'] / 1000) ** 2
        ) * (
            (x['total_depth'] + x['horizontal_displacement']) / x['vertical_depth']
        )
    return output


def phi_3(x):
    output = (
        (
            x['max_mud_weight'] ** 2
        ) * (
            (x['water_depth'] + x['vertical_depth']) / x['vertical_depth']
        )
    )
    return output


def phi_4(x):
    output = phi_1(x) * np.sqrt(
        x['n_strings'] + x['max_mud_weight'] / x['n_strings'] ** 2
    )
    return output


def mri(x):
    output = x['phi_1'] + x['phi_2'] + x['phi_3'] + x['phi_4']
    return output / 1000


#%% HELPER FUNCTIONS --------------------------------------------------------------------
def to_csv_return(x, path):
    """ Writes to .csv and returns original dataframe so can be used
    with method chaining/piping to save intermediate datasets.
    """
    x.to_csv(path)
    return x


#%% CONTRACT CLEANING FUNCTIONS ---------------------------------------------------------
def get_days_from_description(x, multiplier_names):
    # Note: to get duration, go with min of next contract and implied end by description.
    try:
        x['description'] = x['description'].replace('-', ' ')
        number, word = tuple(x['description'].split()[0:2])
        return float(number) * multiplier_names[word]
    except:
        return np.nan


def get_end(x):
    """
    Compute the end of the contract (this is used in the matching) as when the next
    contract starts or otherwise the description if it is the final contract.

    NOTE: when computing the actual duration of the contract I use a more restrictive
    definition of duration: end = min(description, next start) .

    """
    end_by_duration = x['start'] + pd.to_timedelta(x['total_days_description'], unit='d')

    if pd.isnull(x['next_start']):  # If the last observation then will be null...
        return end_by_duration
    else:  # min(x['next_start'] - pd.to_timedelta(1, unit='d'), end_by_duration)
        return x['next_start'] - pd.to_timedelta(1, unit='d')


#%% STATE CONSTRUCTION FUNCTIONS --------------------------------------------------------
def construct_states(
        df, df_contracts, df_gas_by_time, utilization_status, nonutilization_status):
    """

    Args:
        df: dataframe of status
        df_contracts: datafram of contracts only
        df_gas_by_time:
        utilization_status:
        nonutilization_status:

    Returns:

    """

    # Get date ranges
    date_ranges = list()
    for t, (s, e, i, n) in enumerate(
            zip(df['stat start'], df['stat end'], df.index, df['rig name'])):
        try:
            date_ranges.append(pd.date_range(s, e, freq='d'))
        except:
            date_ranges.append(pd.NaT)
    df['date'] = date_ranges

    # Get day by day statuses
    df_time = (
        df
        .explode('date')
        .drop_duplicates(['rig name', 'date'], keep='first')
        .sort_values('date')
        .groupby(['date', 'status', 'spec'], as_index=False)['id']
        .count()
    )
    df_time['date'] = pd.to_datetime(df_time['date'])
    n_under_contract = (
        df_time[df_time['status'].isin(utilization_status)]
        .groupby(['date', 'spec'], as_index=False)['id']
        .sum()
        .rename(columns={'id': 'n_under_contract'})
    )
    n_total = (
        df_time[df_time['status'].isin(utilization_status + nonutilization_status)]
        .groupby(['date', 'spec'], as_index=False)['id']
        .sum()
        .rename(columns={'id': 'n_total'})
    )
    df_time = df_time.merge(
        n_under_contract,
        on=['date', 'spec'],
        how='left'
    )
    df_time = df_time.merge(
        n_total,
        on=['date', 'spec'],
        how='left'
    )
    df_time = df_time.sort_values(['date', 'spec', 'status'])
    df_time['utilization'] = df_time['n_under_contract'] / df_time['n_total']

    # Do aggregation
    df_time_by_date = df_time.drop_duplicates(['date', 'spec'])
    df_time_by_date['n_unemployed'] = df_time_by_date['n_total'] - df_time_by_date[
        'n_under_contract']
    df_time_by_date = df_time_by_date[
        (df_time_by_date['date'] >= pd.to_datetime('2000-01-01'))
        & (df_time_by_date['date'] <= pd.to_datetime('2015-06-30'))
        ].sort_values(['date', 'spec', 'status'])

    # GET STATE DATA (NOTE THAT FINAL STATES DATA WILL BE SMOOTHED IN STATA) ------------
    # Get monthly index
    df_time_by_date['month'] = pd.to_datetime(
        df_time_by_date['date'].dt.strftime('%Y-%m'))

    # Get fortnightly indexing
    s = np.where(df_time_by_date['date'].dt.day < 15, '-01', '-15')
    df_time_by_date['fortnight'] = pd.to_datetime(
        df_time_by_date['date'].dt.strftime('%Y-%m') + s)

    # Do aggregation
    n_matches_by_time = dict()
    df_agg_by_time = dict()
    df_state_by_time = dict()
    for i in ['fortnight', 'month']:
        n_matches_by_time[i] = (
            df_contracts
            .groupby([f'contract_start_{i}', 'spec'], as_index=False)['new_contract']
            .sum()
        )
        df_agg_by_time[i] = (
            df_time_by_date
            .groupby([i, 'spec'], as_index=False)[
                'n_unemployed', 'utilization', 'n_total']
            .mean()
            .merge(
                n_matches_by_time[i],
                left_on=[i, 'spec'],
                right_on=[f'contract_start_{i}', 'spec'],
                how='left'
            )
            .set_index(['spec', i])
            .drop([f'contract_start_{i}'], axis=1)
        )
        df_agg_by_time[i]['new_contract'] = df_agg_by_time[i]['new_contract'].fillna(0.0)
        # df_agg_by_time[i]['n_available'] = df_agg_by_time[i]['n_available'].fillna(df_agg_by_time[i]['n_unemployed'])
        df_agg_by_time[i]['n_available'] = df_agg_by_time[i]['new_contract'] + \
                                           df_agg_by_time[i]['n_unemployed']
        df_agg_by_time[i]['p_match'] = df_agg_by_time[i]['new_contract'] / \
                                       df_agg_by_time[i]['n_available']
        df_state_by_time[i] = df_agg_by_time[i].unstack(level=0)
        df_state_by_time[i].columns = ['_'.join(col).strip() for col in
                                       df_state_by_time[i].columns.values]
        df_state_by_time[i] = (
            df_state_by_time[i]
            .merge(df_gas_by_time[i], left_index=True, right_on='date')
            .reset_index()
            .drop(['index'], axis=1)
        )

    return df_state_by_time, df_agg_by_time
